---
title: "并发与异步模型"
description: "学习 Rust 的并发编程、异步模型和多线程安全，对比 JavaScript 的事件循环机制"
---

# 并发与异步模型

## 📖 学习目标

理解 Rust 的并发编程模型，包括多线程、异步编程和线程安全机制，对比 JavaScript 的单线程事件循环模型。

---

## 🎯 执行模型对比

### JavaScript 的事件循环

JavaScript 使用单线程事件循环模型：

<UniversalEditor title="JavaScript 事件循环" compare={true}>
```javascript !! js
// JavaScript 单线程事件循环
console.log("开始执行");

// 同步代码
const result = 1 + 2;
console.log("同步计算:", result);

// 异步代码 - 微任务
Promise.resolve().then(() => {
    console.log("微任务 1");
});

// 异步代码 - 宏任务
setTimeout(() => {
    console.log("宏任务 1");
}, 0);

// 更多微任务
Promise.resolve().then(() => {
    console.log("微任务 2");
});

console.log("结束执行");

// 输出顺序：
// 开始执行
// 同步计算: 3
// 结束执行
// 微任务 1
// 微任务 2
// 宏任务 1

// 异步函数
async function fetchData() {
    console.log("开始获取数据");
    
    // 模拟网络请求
    const response = await new Promise(resolve => {
        setTimeout(() => {
            resolve({ data: "Hello from server" });
        }, 1000);
    });
    
    console.log("数据获取完成:", response.data);
    return response.data;
}

// 调用异步函数
fetchData().then(data => {
    console.log("处理数据:", data);
});

console.log("继续执行其他代码");

// 并发处理（实际上是并发，不是并行）
async function processItems(items) {
    const promises = items.map(async (item) => {
        // 模拟异步处理
        await new Promise(resolve => setTimeout(resolve, 100));
        return `processed_${item}`;
    });
    
    const results = await Promise.all(promises);
    console.log("所有项目处理完成:", results);
}

processItems([1, 2, 3, 4, 5]);
```
</UniversalEditor>

虽然 JavaScript 本身是单线程的，但可以通过 **Web Workers** 实现多线程并发，将耗时的计算任务放到后台线程执行，避免阻塞主线程。这类似于 Rust 的多线程，但 Web Workers 之间不能直接共享内存，需要通过消息传递进行通信。

### Rust 的多线程模型

Rust 支持真正的多线程并行执行：

<UniversalEditor title="Rust 多线程" compare={true}>
```rust !! rs
use std::thread;
use std::time::Duration;

fn main() {
    println!("开始执行");
    
    // 同步代码
    let result = 1 + 2;
    println!("同步计算: {}", result);
    
    // 创建线程
    let handle = thread::spawn(|| {
        println!("线程 1 开始执行");
        thread::sleep(Duration::from_millis(100));
        println!("线程 1 执行完成");
    });
    
    // 主线程继续执行
    println!("主线程继续执行");
    
    // 等待线程完成
    handle.join().unwrap();
    println!("所有线程执行完成");
}

// 多线程数据处理
fn process_items_parallel(items: Vec<i32>) {
    let handles: Vec<_> = items.into_iter().map(|item| {
        thread::spawn(move || {
            // 模拟处理时间
            thread::sleep(Duration::from_millis(100));
            format!("processed_{}", item)
        })
    }).collect();
    
    // 收集所有线程的结果
    let results: Vec<String> = handles.into_iter()
        .map(|handle| handle.join().unwrap())
        .collect();
    
    println!("所有项目处理完成: {:?}", results);
}

// 使用线程池
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // 发送线程
    thread::spawn(move || {
        let messages = vec![
            String::from("消息 1"),
            String::from("消息 2"),
            String::from("消息 3"),
        ];
        
        for msg in messages {
            tx.send(msg).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // 接收线程
    for received in rx {
        println!("收到: {}", received);
    }
}
```
</UniversalEditor>

### 执行模型差异

1. **单线程 vs 多线程**: JavaScript 单线程事件循环，Rust 支持多线程并行
2. **并发 vs 并行**: JavaScript 并发非并行，Rust 可以真正并行
3. **内存安全**: JavaScript 运行时检查，Rust 编译时保证线程安全
4. **性能**: JavaScript 受单线程限制，Rust 可以充分利用多核

---

## 🔒 线程安全与所有权

### 共享状态管理

<UniversalEditor title="线程安全" compare={true}>
```rust !! rs
use std::sync::{Arc, Mutex};
use std::thread;

// 线程安全的计数器
struct SafeCounter {
    count: Mutex<i32>,
}

impl SafeCounter {
    fn new() -> Self {
        SafeCounter {
            count: Mutex::new(0),
        }
    }
    
    fn increment(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
    }
    
    fn get_count(&self) -> i32 {
        *self.count.lock().unwrap()
    }
}

fn main() {
    let counter = Arc::new(SafeCounter::new());
    let mut handles = vec![];
    
    // 创建多个线程同时增加计数器
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for _ in 0..100 {
                counter.increment();
            }
        });
        handles.push(handle);
    }
    
    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("最终计数: {}", counter.get_count()); // 应该是 1000
}

// 使用 RwLock 进行读写分离
use std::sync::RwLock;

struct DataStore {
    data: RwLock<Vec<String>>,
}

impl DataStore {
    fn new() -> Self {
        DataStore {
            data: RwLock::new(Vec::new()),
        }
    }
    
    fn write(&self, item: String) {
        let mut data = self.data.write().unwrap();
        data.push(item);
    }
    
    fn read(&self) -> Vec<String> {
        let data = self.data.read().unwrap();
        data.clone()
    }
}

fn main() {
    let store = Arc::new(DataStore::new());
    let mut handles = vec![];
    
    // 写入线程
    for i in 0..5 {
        let store = Arc::clone(&store);
        let handle = thread::spawn(move || {
            store.write(format!("数据 {}", i));
        });
        handles.push(handle);
    }
    
    // 读取线程
    for _ in 0..3 {
        let store = Arc::clone(&store);
        let handle = thread::spawn(move || {
            let data = store.read();
            println!("读取到数据: {:?}", data);
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}
```
</UniversalEditor>

---

## ⚡ 异步编程

### Rust 的异步编程

<UniversalEditor title="Rust 异步编程" compare={true}>
```rust !! rs
use tokio;
use std::time::Duration;

// 异步函数
async fn fetch_data() -> String {
    println!("开始获取数据");
    
    // 模拟网络请求
    tokio::time::sleep(Duration::from_secs(1)).await;
    
    println!("数据获取完成");
    String::from("Hello from server")
}

// 异步主函数
#[tokio::main]
async fn main() {
    println!("开始执行");
    
    // 调用异步函数
    let data = fetch_data().await;
    println!("处理数据: {}", data);
    
    println!("继续执行其他代码");
}

// 并发异步任务
async fn process_items_async(items: Vec<i32>) -> Vec<String> {
    let mut handles = vec![];
    
    for item in items {
        let handle = tokio::spawn(async move {
            // 模拟异步处理
            tokio::time::sleep(Duration::from_millis(100)).await;
            format!("processed_{}", item)
        });
        handles.push(handle);
    }
    
    // 等待所有任务完成
    let mut results = vec![];
    for handle in handles {
        let result = handle.await.unwrap();
        results.push(result);
    }
    
    results
}

#[tokio::main]
async fn main() {
    let items = vec![1, 2, 3, 4, 5];
    let results = process_items_async(items).await;
    println!("所有项目处理完成: {:?}", results);
}

// 异步通道
use tokio::sync::mpsc;

async fn async_channel_example() {
    let (tx, mut rx) = mpsc::channel(100);
    
    // 发送任务
    let sender = tokio::spawn(async move {
        for i in 0..10 {
            tx.send(format!("消息 {}", i)).await.unwrap();
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
    
    // 接收任务
    let receiver = tokio::spawn(async move {
        while let Some(message) = rx.recv().await {
            println!("收到: {}", message);
        }
    });
    
    // 等待两个任务完成
    let _ = tokio::join!(sender, receiver);
}

#[tokio::main]
async fn main() {
    async_channel_example().await;
}
```
</UniversalEditor>

### 异步与多线程对比

<UniversalEditor title="异步 vs 多线程" compare={true}>
```rust !! rs
use std::time::Duration;
use tokio;

// 多线程版本 - 适合 CPU 密集型任务
fn cpu_intensive_task() -> i32 {
    // 模拟 CPU 密集型计算
    let mut result = 0;
    for i in 0..1_000_000 {
        result += i;
    }
    result
}

fn multi_threaded_example() {
    let start = std::time::Instant::now();
    
    let handles: Vec<_> = (0..4).map(|_| {
        std::thread::spawn(|| {
            cpu_intensive_task()
        })
    }).collect();
    
    let results: Vec<i32> = handles.into_iter()
        .map(|h| h.join().unwrap())
        .collect();
    
    let duration = start.elapsed();
    println!("多线程耗时: {:?}, 结果: {:?}", duration, results);
}

// 异步版本 - 适合 I/O 密集型任务
async fn io_intensive_task() -> String {
    // 模拟 I/O 操作
    tokio::time::sleep(Duration::from_millis(100)).await;
    String::from("I/O 任务完成")
}

async fn async_example() {
    let start = std::time::Instant::now();
    
    let handles: Vec<_> = (0..100).map(|_| {
        tokio::spawn(async {
            io_intensive_task().await
        })
    }).collect();
    
    let results: Vec<String> = futures::future::join_all(handles).await
        .into_iter()
        .map(|r| r.unwrap())
        .collect();
    
    let duration = start.elapsed();
    println!("异步耗时: {:?}, 处理了 {} 个任务", duration, results.len());
}

#[tokio::main]
async fn main() {
    // 比较两种方法
    multi_threaded_example();
    async_example().await;
}
```
</UniversalEditor>

---

## 🎯 并发模式对比

### JavaScript 的并发模式

<UniversalEditor title="JavaScript 并发模式" compare={true}>
```javascript !! js
// JavaScript 的并发模式
class DataStore {
    constructor() {
        this.data = [];
        this.promise = Promise.resolve();
    }
    
    // 使用 Promise 链确保顺序执行
    async write(item) {
        this.promise = this.promise.then(async () => {
            await new Promise(resolve => setTimeout(resolve, 100));
            this.data.push(item);
            console.log(`写入: ${item}`);
        });
        return this.promise;
    }
    
    async read() {
        await this.promise;
        return [...this.data];
    }
}

// 使用示例
async function javascript_concurrency() {
    const store = new DataStore();
    
    // 并发写入
    const writePromises = [];
    for (let i = 0; i < 5; i++) {
        writePromises.push(store.write(`数据 ${i}`));
    }
    
    // 等待所有写入完成
    await Promise.all(writePromises);
    
    // 读取数据
    const data = await store.read();
    console.log("读取到数据:", data);
}

// Worker 线程（类似 Rust 的多线程）
if (typeof Worker !== 'undefined') {
    const worker = new Worker(`
        self.onmessage = function(e) {
            const result = e.data * 2;
            self.postMessage(result);
        };
    `);
    
    worker.onmessage = function(e) {
        console.log("Worker 计算结果:", e.data);
    };
    
    worker.postMessage(42);
}

javascript_concurrency();
```
</UniversalEditor>

### Rust 的并发模式

<UniversalEditor title="Rust 并发模式" compare={true}>
```rust !! rs
use std::sync::{Arc, Mutex};
use tokio::sync::RwLock;

// 线程安全的异步数据存储
struct AsyncDataStore {
    data: Arc<RwLock<Vec<String>>>,
}

impl AsyncDataStore {
    fn new() -> Self {
        AsyncDataStore {
            data: Arc::new(RwLock::new(Vec::new())),
        }
    }
    
    async fn write(&self, item: String) {
        let mut data = self.data.write().await;
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        data.push(item);
        println!("写入: {}", item);
    }
    
    async fn read(&self) -> Vec<String> {
        let data = self.data.read().await;
        data.clone()
    }
}

#[tokio::main]
async fn rust_concurrency() {
    let store = Arc::new(AsyncDataStore::new());
    
    // 并发写入
    let mut write_handles = vec![];
    for i in 0..5 {
        let store = Arc::clone(&store);
        let handle = tokio::spawn(async move {
            store.write(format!("数据 {}", i)).await;
        });
        write_handles.push(handle);
    }
    
    // 等待所有写入完成
    for handle in write_handles {
        handle.await.unwrap();
    }
    
    // 读取数据
    let data = store.read().await;
    println!("读取到数据: {:?}", data);
}

// 使用 rayon 进行并行计算
use rayon::prelude::*;

fn parallel_computation() {
    let numbers: Vec<i32> = (0..1_000_000).collect();
    
    let start = std::time::Instant::now();
    
    // 并行计算
    let sum: i64 = numbers.par_iter()
        .map(|&x| x as i64)
        .sum();
    
    let duration = start.elapsed();
    println!("并行计算耗时: {:?}, 结果: {}", duration, sum);
}

fn main() {
    // 运行异步并发示例
    rust_concurrency();
    
    // 运行并行计算示例
    parallel_computation();
}
```
</UniversalEditor>

---

## 🎯 练习题

### 练习 1: 线程安全计数器

创建一个线程安全的计数器，支持多个线程同时增加计数：

<details>
<summary>查看答案</summary>

```rust
use std::sync::{Arc, Mutex};
use std::thread;

struct Counter {
    count: Mutex<i32>,
}

impl Counter {
    fn new() -> Self {
        Counter {
            count: Mutex::new(0),
        }
    }
    
    fn increment(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
    }
    
    fn get_count(&self) -> i32 {
        *self.count.lock().unwrap()
    }
}

fn main() {
    let counter = Arc::new(Counter::new());
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for _ in 0..100 {
                counter.increment();
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("最终计数: {}", counter.get_count()); // 应该是 1000
}
```

</details>

### 练习 2: 异步任务处理

创建一个异步函数，并发处理多个任务并收集结果：

<details>
<summary>查看答案</summary>

```rust
use tokio;
use std::time::Duration;

async fn process_task(id: i32) -> String {
    tokio::time::sleep(Duration::from_millis(100)).await;
    format!("任务 {} 完成", id)
}

async fn process_all_tasks() -> Vec<String> {
    let mut handles = vec![];
    
    for i in 0..10 {
        let handle = tokio::spawn(async move {
            process_task(i).await
        });
        handles.push(handle);
    }
    
    let mut results = vec![];
    for handle in handles {
        let result = handle.await.unwrap();
        results.push(result);
    }
    
    results
}

#[tokio::main]
async fn main() {
    let results = process_all_tasks().await;
    println!("所有任务完成: {:?}", results);
}
```

</details>

### 练习 3: 生产者消费者模式

实现一个生产者消费者模式，使用通道进行线程间通信：

<details>
<summary>查看答案</summary>

```rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn producer_consumer() {
    let (tx, rx) = mpsc::channel();
    
    // 生产者
    let producer = thread::spawn(move || {
        for i in 0..10 {
            tx.send(format!("产品 {}", i)).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // 消费者
    let consumer = thread::spawn(move || {
        for received in rx {
            println!("消费: {}", received);
        }
    });
    
    producer.join().unwrap();
    consumer.join().unwrap();
}

fn main() {
    producer_consumer();
}
```

</details>

---

## 📝 总结

在这一章中，我们学习了 Rust 的并发和异步编程：

1. **执行模型**: Rust 支持真正的多线程并行，JavaScript 是单线程事件循环
2. **线程安全**: Rust 通过所有权系统在编译时保证线程安全
3. **异步编程**: Rust 的 async/await 语法与 JavaScript 类似
4. **并发模式**: 使用 Arc、Mutex、RwLock 等类型安全地共享状态
5. **性能选择**: 多线程适合 CPU 密集型，异步适合 I/O 密集型

### 关键要点

- Rust 的所有权系统确保线程安全
- 异步编程提供高性能的 I/O 处理
- 选择合适的并发模型很重要
- 编译时检查避免运行时错误

### 下一步学习

在下一章中，我们将学习 Rust 的类型系统和特征（Traits），了解如何构建可复用的抽象。

---

**继续学习**: [类型系统与特征](./module-05-type-system-traits) 